Skip to content

NIFI-15483: Route PublishAMQP to failure on undeliverable messages instead of silent success#11213

Open
rakesh-rsky wants to merge 1 commit into
apache:mainfrom
rakesh-rsky:fix/NIFI-15483-publishamqp-route-on-failure
Open

NIFI-15483: Route PublishAMQP to failure on undeliverable messages instead of silent success#11213
rakesh-rsky wants to merge 1 commit into
apache:mainfrom
rakesh-rsky:fix/NIFI-15483-publishamqp-route-on-failure

Conversation

@rakesh-rsky
Copy link
Copy Markdown
Contributor

@rakesh-rsky rakesh-rsky commented May 6, 2026

Summary

PublishAMQP silently routes FlowFiles to REL_SUCCESS even when the AMQP broker cannot deliver the message, causing silent data loss.

Two failure modes are addressed:

  1. Undeliverable message (basic.return) — broker returns the message when no queue is bound to the exchange/routing-key. The fix uses AMQP Publisher Confirms + basic.return to detect and surface delivery failures, routing the FlowFile to REL_FAILURE.

  2. Exchange not found (ShutdownSignalException) — when the exchange does not exist, the broker closes the channel with 404 NOT_FOUND, causing waitForConfirms() to throw ShutdownSignalException. This is now caught and converted to AMQPException so the FlowFile routes to REL_FAILURE instead of causing an unhandled processor failure.

Testing

  • Publish to an exchange with no bound queues → FlowFile routes to failure with the broker's return reason
  • Publish to a non-existent exchange → FlowFile routes to failure instead of unhandled processor error
  • Normal publish (queue bound) → still routes to success
  • Added regression tests for all failure scenarios — verified to fail against unfixed code

Fixes: https://issues.apache.org/jira/browse/NIFI-15483

@rakesh-rsky rakesh-rsky force-pushed the fix/NIFI-15483-publishamqp-route-on-failure branch from 5da2156 to ace38df Compare May 6, 2026 11:09
@turcsanyip
Copy link
Copy Markdown
Contributor

@rakesh-rsky Thanks for working on this issue. Please note the unit tests are failing.

I tried the new error handling in my local environment but I'm getting the following runtime error (instead of routing the FlowFile to failure):

2026-05-06 14:34:55,697 ERROR [Timer-Driven Process Thread-4] o.a.nifi.amqp.processors.PublishAMQP PublishAMQP[id=4fd4ee4e-8e3a-3969-7db4-953498193e4d] Processor failure
com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'dummy' in vhost '/', class-id=60, method-id=40)
	at com.rabbitmq.client.impl.ChannelN.waitForConfirms(ChannelN.java:219)
	at org.apache.nifi.amqp.processors.AMQPPublisher.publish(AMQPPublisher.java:108)
	at org.apache.nifi.amqp.processors.PublishAMQP.processResource(PublishAMQP.java:185)
	at org.apache.nifi.amqp.processors.PublishAMQP.processResource(PublishAMQP.java:52)
	at org.apache.nifi.amqp.processors.AbstractAMQPProcessor.onTrigger(AbstractAMQPProcessor.java:236)
	at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
	at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1292)
	at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:229)
	at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:102)
	at org.apache.nifi.engine.FlowEngine.lambda$wrap$1(FlowEngine.java:105)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:358)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'dummy' in vhost '/', class-id=60, method-id=40)
	at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:529)
	at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:350)
	at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:193)
	at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:125)
	at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:768)
	at com.rabbitmq.client.impl.AMQConnection.access$400(AMQConnection.java:49)
	at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:695)
	... 1 common frames omitted

@turcsanyip turcsanyip self-requested a review May 6, 2026 12:43
@rakesh-rsky
Copy link
Copy Markdown
Contributor Author

@turcsanyip Thank you for validating this.

Root cause: When the exchange does not exist, the broker closes the channel with 404 NOT_FOUND, causing waitForConfirms() to throw ShutdownSignalException — which was not caught, resulting in an unhandled processor failure instead of routing to REL_FAILURE.

This has been fixed in the latest commits — ShutdownSignalException is now caught and converted to AMQPException, along with regression tests covering this and related broker failure scenarios.

@rakesh-rsky rakesh-rsky force-pushed the fix/NIFI-15483-publishamqp-route-on-failure branch 4 times, most recently from 46fe897 to 89f5fe6 Compare May 20, 2026 05:32
…r cannot deliver message

PublishAMQP uses mandatory=true on basicPublish() so the broker returns
messages it cannot route to any queue. However, the return arrives
asynchronously via ReturnListener.handleReturn() on the AMQP I/O thread
while the publishing thread had already moved on to session.transfer(REL_SUCCESS).
The UndeliverableMessageLogger only logged a warning — it never signaled
failure back to publish() or onTrigger(), so every unroutable message was
silently counted as a success despite never reaching any consumer.

Fix:
- Enabled Publisher Confirms (channel.confirmSelect()) in the constructor.
  The broker's basic.return frame for an unroutable message is guaranteed
  to arrive before the corresponding confirm frame, so waitForConfirms()
  acts as a synchronization barrier that makes return detection reliable.
- Added an AtomicReference<String> field (undeliverableReturnReason) that
  UndeliverableMessageLogger.handleReturn() populates with exchange/routingKey/
  replyCode/replyText when a message is returned.
- publish() now: resets the field before each call, calls waitForConfirms(5s)
  to synchronize with the broker, then checks the field and throws AMQPException
  if the message was returned — causing onTrigger() to route to REL_FAILURE.
- Broker NACKs (e.g., resource alarm) are also now surfaced as AMQPException
  because waitForConfirms() returns false on NACK.

- Added regression tests to verify that AMQPPublisher and PublishAMQP correctly
  route FlowFiles to REL_FAILURE for all broker-side failure modes:

- Added ShutdownSignalException to the catch block in AMQPPublisher.publish()
- Converts the channel-close signal into AMQPException so PublishAMQP routes
   the FlowFile to REL_FAILURE with a descriptive error message
- Added ShutdownSignalException import

Co-authored-by: Rakesh Kumar Singh <rsky.rakesh@gmail.com>
@rakesh-rsky rakesh-rsky force-pushed the fix/NIFI-15483-publishamqp-route-on-failure branch from 89f5fe6 to 99cf2e5 Compare May 20, 2026 20:34
@turcsanyip
Copy link
Copy Markdown
Contributor

turcsanyip commented May 20, 2026

@rakesh-rsky I checked the latest changes and the error handling is correct now functionally.

However, the synchronous wait led to significant performance degradation compared to the original version. According to my measurement:

  • local NiFi + local RabbitMQ in Docker: 10x slower
  • local NiFi + RabbitMQ on a cloud VM: 300-1000x slower (depending on the cloud location)

So even if the error handling is improved, I don't think we can add the change as-is due to the performance effect. I suggest adding a feature flag property (e.g. Use Message Confirmation = true / false or Delivery Guarantee = At least once / At most once) with proper documentation on the pros and cons. This way, the user can optionally enable this feature if they want the extra failure handling, at the cost of performance.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants